﻿using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using mqtttLib.Helper;
using mqtttLib.Helper.TcpSocket;
using mqtttLib.Messages;
using mqtttLib.Messages.Base;

namespace mqtttLib
{
    /// <summary>
    /// 客户端信息
    /// </summary>
    public class ClientModel
    {
        /// <summary>
        /// 地址
        /// </summary>
        public EndPoint Address { get; set; }
        /// <summary>
        ///客户端信息
        /// </summary>
        public ConnectMessage Info { get; set; }

        /// <summary>
        /// 上次通讯返回时间
        /// </summary>
        public DateTime LastTime { get; set; }

        ///// <summary>
        ///// 订阅的主题
        ///// </summary>
        //public List<string> SubscribeTopic { get; set; }
    }

    public class PublishMeodel
    {
        public PublishMessage Message { get; set; }
        public IDataTransmit Sender { get; set; }
    }
    public class MqttServer : HandleServer
    {
        /// <summary>
        /// tcp 服务
        /// </summary>
        private TcpService _server;

        /// <summary>
        /// 客户端session
        /// </summary>
        public DoubleKeyDictionary<string, EndPoint, ClientModel> Session = new DoubleKeyDictionary<string, EndPoint, ClientModel>();

        /// <summary>
        /// 主题集合
        /// LinkedList 客户端集合
        /// </summary>
        public Dictionary<string, LinkedList<ClientModel>> Topic = new Dictionary<string, LinkedList<ClientModel>>();
 
        /// <summary>
        /// 日志管理器
        /// </summary>
        protected readonly ILog LogManager;
      

        /// <summary>
        /// 等待发送数据
        /// </summary>
        private readonly Queue<PublishMeodel> _sendMessage = new Queue<PublishMeodel>();
        /// <summary>
        /// 等待确定 
        /// </summary>
        private readonly LinkedList<DetermineMessage> _determineMessage = new LinkedList<DetermineMessage>();
        protected MqttServer(ILog log)
        {
            LogManager = log;
        }

        /// <summary>
        /// 记录日志
        /// </summary>
        public event Action<string, MqttMessage> LogEvent;
        protected virtual void Log(string title, MqttMessage message)
        {
            LogManager?.Info(title, message);
            LogEvent?.Invoke(title, message);
        }

        public bool Send(IDataTransmit sender, MqttMessage message)
        {
            if (sender.Send(message.GetDataBytes()))
            {
                Log(sender.RemoteEndPoint + " 返回数据 ", message);
                return true;

            }
            return false;
        }
        //public bool SendRestart()
        //{
        //    foreach (var item in Session.Values2)
        //    {

        //        //  item.Value.
        //        if (Session.ContainsKey(item.Key))
        //        {
        //            var clientModel = Session.Get(item.Key);
        //            var client = _server.Session[clientModel.Address];

        //            RestartMessage message = new RestartMessage();
        //            Send(client, message);
        //            Log(item.Key + " 返回数据 ", message);

        //        }
        //    }
        //    return true;
        //}
        public bool SendRestart(string clientId)
        { 
            var clientModel = Session.GetK(clientId); 
            var client = _server.Session[clientModel];

            RestartMessage message = new RestartMessage();
            Send(client, message);
            Log(clientModel + " 返回数据 ", message); 
            return true;
        }

        /// <summary>
        /// 构造方法
        /// </summary> 
        /// <param name="port"></param>
        /// <param name="log"></param>
        public MqttServer(int port, ILog log = null) : this(log)
        {
            //在指定端口上建立监听线程
            _server = new TcpService(port);
            _server.Connected += server_Connected;
            _server.DisConnect += server_DisConnect;
            //心跳线程
            var heartbeatTask = new Task(() =>
            {
                while (true)
                {
                    Thread.Sleep(1000 * 60 * 1);

                    try
                    {
                        //清除失效 seesion 
                        Dictionary<string, DoubleKey<string, EndPoint, ClientModel>> temp = Session.Values1.ToDictionary(item => item.Key, item => item.Value);
                        DateTime date = DateTime.Now;
                        foreach (var item in temp)
                        {
                            //if (item.Value.V.LastTime.AddSeconds(item.Value.V.Info.KeepAlive) < date)
                            if (item.Value.V.LastTime.AddSeconds(item.Value.V.Info.KeepAlive * 1.5) < date)
                            {
                                RemoveTopic(item.Value.K2);
                                // Session.Remove(item.Key);
                            }
                        }
                    }
                    // ReSharper disable once EmptyGeneralCatchClause
                    catch (Exception)
                    {

                    }
                }
                // ReSharper disable once FunctionNeverReturns
            });

            //发送数据
            var messageSenTask = new Task(() =>
            {
                while (true)
                {
                    try
                    {
                        //发送数据
                        if (_sendMessage.Count > 0)
                        {
                            PublishMeodel message = _sendMessage.Dequeue();
                            Forward(message);

                        }
                        else
                        {
                            Thread.Sleep(100);
                        }
                        ////重复发送 
                        //if (_determineMessage.Count > 1)
                        //{
                        //    List<DetermineMessage> temp = _determineMessage.Where(item => item.SendTime.AddMilliseconds(1000 * 60) < DateTime.Now).ToList();

                        //    foreach (var mesitem in temp)
                        //    {
                        //        PublishMessage message = mesitem.Message as PublishMessage;
                        //        if (message != null)
                        //        {
                        //            if (Topic.ContainsKey(message.TopicName))
                        //            {
                        //                LinkedList<ClientModel> clientS = Topic[message.TopicName];
                        //                if (clientS.Count > 0)
                        //                {
                        //                    switch (message.FixedHeader.Qos)
                        //                    {
                        //                        case Qos.AtMostOnce: //最多一次

                        //                            break;
                        //                        case Qos.AtLeastOnce: //至少一次 
                        //                                              //推送
                        //                            foreach (var item in clientS)
                        //                            {
                        //                                if (Session.ContainsKey(item.Address))
                        //                                {
                        //                                    var clientModel = Session.Get(item.Address);
                        //                                    var client = _server.Session[clientModel.Address];
                        //                                    Send(client, message);
                        //                                }
                        //                            }

                        //                            break;
                        //                        case Qos.ExactlyOnce: //只有一次 
                        //                            break;
                        //                    }
                        //                } //没有订阅对象
                        //            }
                        //        }
                        //        _determineMessage.Remove(mesitem);
                        //        Thread.Sleep(100);
                        //    }

                        //}




                    }
                    catch (Exception ex)
                    {
                        LogManager?.Error(ex);
                    }
                    Thread.Sleep(100);
                }
                // ReSharper disable once FunctionNeverReturns
            });
            messageSenTask.Start();
            heartbeatTask.Start();

        }

        private async void Forward(PublishMeodel message)
        {
            if (message != null)
            {


                await Task.Run(() =>
                {
                    if (Topic.ContainsKey(message.Message.TopicName))
                    {
                        LinkedList<ClientModel> clientS = Topic[message.Message.TopicName];
                        if (clientS.Count > 0)
                        {
                            switch (message.Message.FixedHeader.Qos)
                            {
                                case Qos.AtMostOnce: //最多一次

                                    break;
                                case Qos.AtLeastOnce: //至少一次 
                                                      //推送
                                    var packetId = message.Message.PacketIdentifier;
                                    foreach (var item in clientS)
                                    {
                                        if (Session.ContainsKey(item.Address))
                                        {
                                            var clientModel = Session.Get(item.Address);
                                            var client = _server.Session[clientModel.Address];
                                            Send(client, message.Message);
                                        }
                                    }
                                    //回复 推送者
                                    var mes1 = new PublishAckMessage
                                    {
                                        FixedHeader = { Qos = Qos.AtLeastOnce },
                                        PacketIdentifier = packetId
                                    };
                                    Send(message.Sender, mes1);
                                    ////加入确定
                                    //message.FixedHeader.Dup = true;
                                    //_determineMessage.AddLast(new DetermineMessage()
                                    //{
                                    //    SendTime = DateTime.Now,
                                    //    Message = message
                                    //});

                                    break;
                                case Qos.ExactlyOnce: //只有一次 
                                    foreach (var item in clientS)
                                    {
                                        if (Session.ContainsKey(item.Address))
                                        {
                                            var clientModel = Session.Get(item.Address);
                                            var client = _server.Session[clientModel.Address];
                                            Send(client, message.Message);
                                        }
                                    }
                                    //回复 推送者
                                    var mes = new PublishAckMessage
                                    {
                                        FixedHeader = { Qos = Qos.AtLeastOnce },
                                        PacketIdentifier = message.Message.PacketIdentifier
                                    };
                                    Send(message.Sender, mes);
                                    break;
                            }
                        }
                        else
                        {  //没有订阅对象
                            _sendMessage.Enqueue(message);
                        }
                    } //没有订阅对象
                });
            }
        }

        /// <summary>
        /// 连接开始
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        void server_Connected(IDataTransmit sender, NetEventArgs e)
        {
            try
            {
                sender.ReceiveData += ReceiveData;
                //接收数据
                sender.Start();
                Log(sender.RemoteEndPoint + " 连接成功", null);
            }
            catch (Exception ex)
            {
                LogManager?.Error(ex);
            }

        }

        /// <summary>
        /// 断开连接
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        void server_DisConnect(IDataTransmit sender, NetEventArgs e)
        {
            RemoveTopic(sender.RemoteEndPoint);
            Log(DateTime.Now.ToLongTimeString() + " " + sender.RemoteEndPoint + " 连接断开", null);

        }
        public override void DisconnectMethod(IDataTransmit sender, DisconnectMessage message)
        {
            RemoveTopic(sender.RemoteEndPoint);

            Log(DateTime.Now.ToLongTimeString() + " " + sender.RemoteEndPoint + " 连接断开", message);

        }
        /// <summary>
        /// 断开时 删除订阅
        /// </summary>
        /// <param name="endPoint"></param>
        /// <returns></returns>
        public bool RemoveTopic(EndPoint endPoint)
        {
            var temp = Topic.Keys.ToArray();
            foreach (var item in temp)
            {
                var clientS = Topic[item].ToArray();
                foreach (var item2 in clientS)
                {
                    if (endPoint == item2.Address)
                    {
                        Topic[item].Remove(item2);
                        if (Topic[item].Count == 0)
                        {
                            Topic.Remove(item);
                        }
                        break;
                    }
                }
            }
            if (Session.ContainsKey(endPoint))
            {
                Session.Remove(endPoint);
            }
            return true;
        }
        /// <summary>
        /// 接收消息 处理
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void ReceiveData(IDataTransmit sender, NetEventArgs e)
        {
            try
            {
                //  lock (this)
                {
                    byte[] data = (byte[])e.EventArg;

                    var message = Handle(sender, data);

                    Log(sender.RemoteEndPoint + " 接收数据 ", message);

                    //if (result != null)
                    //{
                    //    //GetSendBuffer
                    //    //sender.Send(result.GetDataBytes()); 
                    //}

                    ////删除 大于 q1 获取 q2 的包
                    //if (message is MqttMessagePacketId)
                    //{
                    //    var packetId = (message as MqttMessagePacketId).PacketIdentifier;
                    //    var tmep = _determineMessage;

                    //    foreach (var item in tmep)
                    //    {
                    //        if (item.Message.PacketIdentifier == packetId
                    //            && item.Message.FixedHeader.MessageType == message.FixedHeader.MessageType)
                    //        {
                    //            _determineMessage.Remove(item);
                    //        }
                    //    }
                    //}

                }
            }
            catch (Exception ex)
            {
                LogManager?.Error(ex);
            }
        }
        /// <summary>
        /// 开始监听
        /// </summary>
        public void Start()
        {
            _server.Start();
        }

        /// <summary>
        /// 停止监听
        /// </summary>
        public void Stop()
        {
            if (_server != null)
            {
                _server.Close();
                _server = null;
            }
        }

        /// <summary>
        /// 连接验证
        /// </summary>
        public event Func<string, string, bool> ConnectValidatEvent;
        /// <summary>
        /// 连接 
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="message"></param>
        /// <returns></returns>
        public override void ConnectMethod(IDataTransmit sender, ConnectMessage message)
        {
            if (message != null)
            {
                var result = new ConnAckMessage();
                if ((message.ProtocolName.ToUpper().Equals("MQTT") || message.ProtocolName.ToUpper().Equals("MQISDP")) && message.ProtocolVersion <= 0x04)
                {
                    if (message.ClientId.Length >= 6)
                    {
                        if (ConnectValidatEvent != null)
                        {
                            if (message.UsernameFlag & message.PasswordFlag)//是否有内容
                            {
                                if (!ConnectValidatEvent(message.UserName, message.Password))//执行 验证事件
                                {
                                    result.ReturnCode = MqttConnectReturnCode.BadUsernameOrPassword; //用户或者密码 错误
                                }
                            }
                            else
                            {
                                result.ReturnCode = MqttConnectReturnCode.BadUsernameOrPassword; //用户或者密码 错误

                            }

                        }
                        if (result.ReturnCode == MqttConnectReturnCode.ConnectionAccepted)
                        {
                            if (Session.ContainsKey(message.ClientId))
                            {
                                var c = Session.Get(message.ClientId);
                                RemoveTopic(c.Address);
                                //是否存在 存在删除以前的连接
                                //   Session.Remove(message.ClientId);
                            }
                            //客户端对象
                            var client = new ClientModel
                            {
                                Address = sender.RemoteEndPoint,
                                Info = message,
                                LastTime = DateTime.Now
                            };
                            Session.Add(message.ClientId, sender.RemoteEndPoint, client);

                        }

                    }
                    else
                        result.ReturnCode = MqttConnectReturnCode.IdentifierRejected; //用户或者密码 错误 
                }
                else
                    result.ReturnCode = MqttConnectReturnCode.UnacceptedProtocolVersion; //不支持的协议级别  
                //消息返回
                Send(sender, result);
            }

            //return result;
        }

        /// <summary>
        /// 消息集合
        /// </summary>
        //  readonly Queue<PublishMessage> _publishMessage = new Queue<PublishMessage>();
        /// <summary>
        /// 消息集合
        /// </summary>
        //readonly Queue<PublishMessage> _q1Message = new Queue<PublishMessage>();

     

        /// <summary>
        /// 推送
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="message"></param>
        /// <returns></returns>
        public override void PublishMethod(IDataTransmit sender, PublishMessage message)
        {
            if (Session.ContainsKey(sender.RemoteEndPoint))
            {
                //_publishMessage.Enqueue(message);
                //PublishMessage message1 = _publishMessage.Dequeue();

                //PUBACK报文是对QoS 1等级的PUBLISH报文的响应。
                // Publish();

                if (Topic.ContainsKey(message.TopicName))
                {
                    LinkedList<ClientModel> clientS = Topic[message.TopicName];
                    if (clientS.Count > 0)
                    {
                        switch (message.FixedHeader.Qos)
                        {
                            case Qos.AtMostOnce: //最多一次

                                //推送消息到订阅者
                                foreach (var item in clientS)
                                {
                                    if (Session.ContainsKey(item.Address))
                                    {
                                        var clientModel = Session.Get(item.Address);
                                        var client = _server.Session[clientModel.Address];
                                        //client.Send(message.GetDataBytes());//推送到订阅者
                                        Send(client, message);
                                    }
                                }
                                //回复 推送者
                                var mes = new PublishAckMessage();
                                Send(sender, mes);
                                break;
                            case Qos.AtLeastOnce: //至少一次
                                _sendMessage.Enqueue(new PublishMeodel() { Message = message, Sender = sender, });
                                ////推送
                                //foreach (var item in clientS)
                                //{
                                //    if (Session.ContainsKey(item.Address))
                                //    {
                                //        var clientModel = Session.Get(item.Address);
                                //        var client = _server.Session[clientModel.Address];
                                //      //  Send(client, message);
                                //        //client.Send(message.GetDataBytes());//推送到订阅者
                                //    }
                                //}
                                ////回复 推送者
                                // var mes1 = new PublishAckMessage
                                //{
                                //    FixedHeader = { Qos = Qos.AtLeastOnce },
                                //    PacketIdentifier = message.PacketIdentifier
                                //};
                                //Send(sender, mes1);
                                break;
                            case Qos.ExactlyOnce: //只有一次
                                _sendMessage.Enqueue(new PublishMeodel() { Message = message, Sender = sender, });
                                ////回复 推送者
                                //var mes2 = new PublishAckMessage()
                                //{
                                //    PacketIdentifier = message.PacketIdentifier
                                //};
                                //Send(sender, mes2);
                                break;
                        }
                    }
                    else
                    {
                        switch (message.FixedHeader.Qos)
                        {
                            case Qos.AtLeastOnce:
                                _sendMessage.Enqueue(new PublishMeodel() { Message = message, Sender = sender, });
                                break;
                            case Qos.ExactlyOnce:
                                _sendMessage.Enqueue(new PublishMeodel() { Message = message, Sender = sender, });
                                break;
                        }
                    } //没有订阅对象
                }
                else
                {
                    Topic.Add(message.TopicName, new LinkedList<ClientModel>());
                    switch (message.FixedHeader.Qos)
                    {
                        case Qos.AtLeastOnce:
                            _sendMessage.Enqueue(new PublishMeodel() { Message = message, Sender = sender, });
                            break;
                        case Qos.ExactlyOnce:
                            _sendMessage.Enqueue(new PublishMeodel() { Message = message, Sender = sender, });
                            break;
                    }
                } //没有订阅对象
            }//登录失效
        }

        ///// <summary>
        ///// 推送处理
        ///// </summary>
        //public void Publish()
        //{


        //}

        /// <summary>
        /// 订阅消息
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="message"></param>
        /// <returns></returns>
        public override void SubscribeMethod(IDataTransmit sender, SubscribeMessage message)
        {
            // var clientModel = Session.Get(item);
            if (Session.ContainsKey(sender.RemoteEndPoint))
            {
                string clientId = Session.GetK(sender.RemoteEndPoint);
                //订阅的多个消息
                SubscribeAckCode[] data = new SubscribeAckCode[message.Topics.Count];
                int i = 0;
                foreach (var item in message.Topics)
                {
                    //是否 第一个订阅 主题
                    if (Topic.ContainsKey(item.Topic))
                    {
                        bool state = Topic[item.Topic].Any(clientIdItem => clientIdItem.Equals(Session.Get(clientId)));
                        if (!state) //判断是否已经 订阅
                        {
                            Topic[item.Topic].AddLast(Session.Get(clientId));
                        }
                    }
                    else
                    {
                        //第一个订阅 主题
                        LinkedList<ClientModel> c = new LinkedList<ClientModel>();
                        c.AddLast(Session.Get(clientId));
                        Topic.Add(item.Topic, c);
                    }
                    data[i] = (SubscribeAckCode)((byte)item.Qos);
                    i++;
                }
                //回复
                var result = new SubscribeAckMessage
                {
                    Payload = data,
                    FixedHeader = { Qos = Qos.AtLeastOnce },
                    PacketIdentifier = message.PacketIdentifier
                };
                //_server.Session[client.Address]
                Send(sender, result);
            }
        }



        public override void UnsubscribeMethod(IDataTransmit sender, UnsubscribeMessage message)
        {
            if (Session.ContainsKey(sender.RemoteEndPoint))
            {
                string clientId = Session.GetK(sender.RemoteEndPoint);

                foreach (var item in message.Topics)
                {
                    //是否存在订阅
                    if (Topic.ContainsKey(item))
                    {
                        bool state = Topic[item].Any(clientIdItem => clientIdItem.Equals(Session.Get(clientId)));
                        if (state) //判断是否已经 订阅
                        {
                            if (Topic[item].Count == 1)
                                Topic.Remove(item);
                            else
                                Topic[item].Remove(Session.Get(clientId));
                        }
                    }
                }
            }

            var result = new UnsubscribeAckMessage
            {
                FixedHeader = { Qos = Qos.AtLeastOnce },
                PacketIdentifier = message.PacketIdentifier
            };
            Send(sender, result);
        }

        /// <summary>
        /// 心跳
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="message"></param>
        /// <returns></returns>
        public override void PingMethod(IDataTransmit sender, PingReqMessage message)
        {
            if (Session.ContainsKey(sender.RemoteEndPoint))
            {
                var client = Session.Get(sender.RemoteEndPoint);
                client.LastTime = DateTime.Now;
                Send(_server.Session[client.Address], new PingRespMessage());
            }
        }
    }
}
